#include <test/jtx.h>
#include <test/jtx/WSClient.h>

#include <xrpl/json/json_reader.h>
#include <xrpl/json/to_string.h>
#include <xrpl/protocol/jss.h>
#include <xrpl/server/Port.h>

#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/strand.hpp>
#include <boost/beast/core/multi_buffer.hpp>
#include <boost/beast/websocket.hpp>

#include <iostream>
#include <string>
#include <unordered_map>

namespace ripple {
namespace test {

class WSClientImpl : public WSClient
{
    using error_code = boost::system::error_code;

    struct msg
    {
        Json::Value jv;

        explicit msg(Json::Value&& jv_) : jv(jv_)
        {
        }
    };

    static boost::asio::ip::tcp::endpoint
    getEndpoint(BasicConfig const& cfg, bool v2)
    {
        auto& log = std::cerr;
        ParsedPort common;
        parse_Port(common, cfg["server"], log);
        auto const ps = v2 ? "ws2" : "ws";
        for (auto const& name : cfg.section("server").values())
        {
            if (!cfg.exists(name))
                continue;
            ParsedPort pp;
            parse_Port(pp, cfg[name], log);
            if (pp.protocol.count(ps) == 0)
                continue;
            using namespace boost::asio::ip;
            if (pp.ip && pp.ip->is_unspecified())
                *pp.ip = pp.ip->is_v6() ? address{address_v6::loopback()}
                                        : address{address_v4::loopback()};

            if (!pp.port)
                Throw<std::runtime_error>("Use fixConfigPorts with auto ports");

            return {*pp.ip, *pp.port};
        }
        Throw<std::runtime_error>("Missing WebSocket port");
        return {};  // Silence compiler control paths return value warning
    }

    template <class ConstBuffers>
    static std::string
    buffer_string(ConstBuffers const& b)
    {
        using boost::asio::buffer;
        using boost::asio::buffer_size;
        std::string s;
        s.resize(buffer_size(b));
        buffer_copy(buffer(&s[0], s.size()), b);
        return s;
    }

    boost::asio::io_context ios_;
    std::optional<boost::asio::executor_work_guard<
        boost::asio::io_context::executor_type>>
        work_;
    boost::asio::strand<boost::asio::io_context::executor_type> strand_;
    std::thread thread_;
    boost::asio::ip::tcp::socket stream_;
    boost::beast::websocket::stream<boost::asio::ip::tcp::socket&> ws_;
    boost::beast::multi_buffer rb_;

    bool peerClosed_ = false;

    // synchronize destructor
    bool b0_ = false;
    std::mutex m0_;
    std::condition_variable cv0_;

    // synchronize message queue
    std::mutex m_;
    std::condition_variable cv_;
    std::list<std::shared_ptr<msg>> msgs_;

    unsigned rpc_version_;

    void
    cleanup()
    {
        boost::asio::post(
            ios_, boost::asio::bind_executor(strand_, [this] {
                if (!peerClosed_)
                {
                    ws_.async_close(
                        {},
                        boost::asio::bind_executor(strand_, [&](error_code) {
                            try
                            {
                                stream_.cancel();
                            }
                            catch (boost::system::system_error const&)
                            {
                                // ignored
                            }
                        }));
                }
            }));
        work_ = std::nullopt;
        thread_.join();
    }

public:
    WSClientImpl(
        Config const& cfg,
        bool v2,
        unsigned rpc_version,
        std::unordered_map<std::string, std::string> const& headers = {})
        : work_(std::in_place, boost::asio::make_work_guard(ios_))
        , strand_(boost::asio::make_strand(ios_))
        , thread_([&] { ios_.run(); })
        , stream_(ios_)
        , ws_(stream_)
        , rpc_version_(rpc_version)
    {
        try
        {
            auto const ep = getEndpoint(cfg, v2);
            stream_.connect(ep);
            ws_.set_option(boost::beast::websocket::stream_base::decorator(
                [&](boost::beast::websocket::request_type& req) {
                    for (auto const& h : headers)
                        req.set(h.first, h.second);
                }));
            ws_.handshake(
                ep.address().to_string() + ":" + std::to_string(ep.port()),
                "/");
            ws_.async_read(
                rb_,
                boost::asio::bind_executor(
                    strand_,
                    std::bind(
                        &WSClientImpl::on_read_msg,
                        this,
                        std::placeholders::_1)));
        }
        catch (std::exception&)
        {
            cleanup();
            Rethrow();
        }
    }

    ~WSClientImpl() override
    {
        cleanup();
    }

    Json::Value
    invoke(std::string const& cmd, Json::Value const& params) override
    {
        using boost::asio::buffer;
        using namespace std::chrono_literals;

        {
            Json::Value jp;
            if (params)
                jp = params;
            if (rpc_version_ == 2)
            {
                jp[jss::method] = cmd;
                jp[jss::jsonrpc] = "2.0";
                jp[jss::ripplerpc] = "2.0";
                jp[jss::id] = 5;
            }
            else
                jp[jss::command] = cmd;
            auto const s = to_string(jp);
            ws_.write_some(true, buffer(s));
        }

        auto jv = findMsg(5s, [&](Json::Value const& jval) {
            return jval[jss::type] == jss::response;
        });
        if (jv)
        {
            // Normalize JSON output
            jv->removeMember(jss::type);
            if ((*jv).isMember(jss::status) && (*jv)[jss::status] == jss::error)
            {
                Json::Value ret;
                ret[jss::result] = *jv;
                if ((*jv).isMember(jss::error))
                    ret[jss::error] = (*jv)[jss::error];
                ret[jss::status] = jss::error;
                return ret;
            }
            if ((*jv).isMember(jss::status) && (*jv).isMember(jss::result))
                (*jv)[jss::result][jss::status] = (*jv)[jss::status];
            return *jv;
        }
        return {};
    }

    std::optional<Json::Value>
    getMsg(std::chrono::milliseconds const& timeout) override
    {
        std::shared_ptr<msg> m;
        {
            std::unique_lock<std::mutex> lock(m_);
            if (!cv_.wait_for(lock, timeout, [&] { return !msgs_.empty(); }))
                return std::nullopt;
            m = std::move(msgs_.back());
            msgs_.pop_back();
        }
        return std::move(m->jv);
    }

    std::optional<Json::Value>
    findMsg(
        std::chrono::milliseconds const& timeout,
        std::function<bool(Json::Value const&)> pred) override
    {
        std::shared_ptr<msg> m;
        {
            std::unique_lock<std::mutex> lock(m_);
            if (!cv_.wait_for(lock, timeout, [&] {
                    for (auto it = msgs_.begin(); it != msgs_.end(); ++it)
                    {
                        if (pred((*it)->jv))
                        {
                            m = std::move(*it);
                            msgs_.erase(it);
                            return true;
                        }
                    }
                    return false;
                }))
            {
                return std::nullopt;
            }
        }
        return std::move(m->jv);
    }

    unsigned
    version() const override
    {
        return rpc_version_;
    }

private:
    void
    on_read_msg(error_code const& ec)
    {
        if (ec)
        {
            if (ec == boost::beast::websocket::error::closed)
                peerClosed_ = true;
            return;
        }

        Json::Value jv;
        Json::Reader jr;
        jr.parse(buffer_string(rb_.data()), jv);
        rb_.consume(rb_.size());
        auto m = std::make_shared<msg>(std::move(jv));
        {
            std::lock_guard lock(m_);
            msgs_.push_front(m);
            cv_.notify_all();
        }
        ws_.async_read(
            rb_,
            boost::asio::bind_executor(
                strand_,
                std::bind(
                    &WSClientImpl::on_read_msg, this, std::placeholders::_1)));
    }

    // Called when the read op terminates
    void
    on_read_done()
    {
        std::lock_guard lock(m0_);
        b0_ = true;
        cv0_.notify_all();
    }
};

std::unique_ptr<WSClient>
makeWSClient(
    Config const& cfg,
    bool v2,
    unsigned rpc_version,
    std::unordered_map<std::string, std::string> const& headers)
{
    return std::make_unique<WSClientImpl>(cfg, v2, rpc_version, headers);
}

}  // namespace test
}  // namespace ripple
